/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package iped.parsers.fork;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;

import org.apache.tika.config.Field;
import org.apache.tika.exception.TikaException;
import org.apache.tika.fork.ParserFactoryFactory;
import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.mime.MediaType;
import org.apache.tika.parser.AbstractParser;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;

import iped.io.URLUtil;
import iped.utils.IOUtil;

public class ForkParser extends AbstractParser {

    /** Serial version UID */
    private static final long serialVersionUID = -4962742892274663950L;

    private static boolean enabled;

    private static String serverMaxHeapMB = "512M";

    private static int poolSize = 5;

    private static String plugin_dir = null;

    private static ForkParser instance;

    // these are used by the legacy usage
    private final ClassLoader loader;

    private final Parser parser;

    // these are used when the server builds a parser via a directory
    // of jars, not via legacy bootstrap etc.
    private final Path tikaBin;

    private Path pluginDir;

    private final ParserFactoryFactory parserFactoryFactory;

    /** Java command line */
    private List<String> java = Arrays.asList("java", "-Xmx32m", "-Djava.awt.headless=true");

    private int currentlyInUse = 0;

    private final Queue<ForkClient> pool = new LinkedList<>();

    @Field
    private long serverPulseMillis = 1000;

    @Field
    private long serverParseTimeoutMillis = 60000;

    @Field
    private long serverWaitTimeoutMillis = 60000;

    @Field
    private int maxFilesProcessedPerClient = -1;

    public static boolean isEnabled() {
        return enabled;
    }

    public static void setEnabled(boolean enable) {
        enabled = enable;
    }

    public static void setPluginDir(String pluginDir) {
        plugin_dir = pluginDir;
    }

    public static void setPoolSize(int size) {
        poolSize = size;
    }

    public static void setServerMaxHeap(String maxHeapMB) {
        serverMaxHeapMB = maxHeapMB;
    }

    public static ForkParser getForkParser() {
        if (!enabled) {
            return null;
        }
        if (instance == null) {
            synchronized (ForkParser.class) {
                if (instance == null) {

                    instance = new ForkParser(getMainJarsPath(), new File(plugin_dir).toPath(),
                            new ParserFactoryFactory(ExternalParsingParserFactory.class.getName(),
                                    Collections.EMPTY_MAP));
                    instance.setJavaCommand(getCommand(serverMaxHeapMB));
                    instance.setServerParseTimeoutMillis(3600 * 1000);
                    instance.setServerWaitTimeoutMillis(10 * 60 * 1000);
                    instance.setMaxFilesProcessedPerServer(10000);
                }
            }
        }
        return instance;
    }

    private static Path getMainJarsPath() {
        URL url = URLUtil.getURL(ForkParser.class);
        Path jarPath = null;
        try {
            jarPath = new File(url.toURI()).getParentFile().toPath();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        return jarPath;
    }

    private static List<String> getCommand(String maxHeap) {
        List<String> cmd = new ArrayList<>();
        cmd.add("java");
        cmd.add("-Xmx" + maxHeap);
        for (Object key : System.getProperties().keySet()) {
            cmd.add("-D" + key + "=" + System.getProperty(key.toString()).replace("\"", "\\\""));
        }
        return cmd;
    }

    /**
     * If you have a directory with, say, tike-app.jar and you want the child
     * process/server to build a parser and run it from that -- so that you can keep
     * all of those dependencies out of your client code, use this initializer.
     *
     * @param tikaBin
     *            directory containing the tika-app.jar or similar -- full jar
     *            including tika-core and all desired parsers and dependencies
     * @param factoryFactory
     */
    public ForkParser(Path tikaBin, Path pluginDir, ParserFactoryFactory factoryFactory) {
        loader = null;
        parser = null;
        this.tikaBin = tikaBin;
        this.pluginDir = pluginDir;
        this.parserFactoryFactory = factoryFactory;
    }

    /**
     * <b>EXPERT</b>
     * 
     * @param tikaBin
     *            directory containing the tika-app.jar or similar -- full jar
     *            including tika-core and all desired parsers and dependencies
     * @param parserFactoryFactory
     *            -- the factory to use to generate the parser factory in the child
     *            process/server
     * @param classLoader
     *            to use for all classes besides the parser in the child
     *            process/server
     */
    public ForkParser(Path tikaBin, ParserFactoryFactory parserFactoryFactory, ClassLoader classLoader) {
        parser = null;
        loader = classLoader;
        this.tikaBin = tikaBin;
        this.parserFactoryFactory = parserFactoryFactory;
    }

    /**
     * @param loader
     *            The ClassLoader to use
     * @param parser
     *            the parser to delegate to. This one cannot be another ForkParser
     */
    public ForkParser(ClassLoader loader, Parser parser) {
        if (parser instanceof ForkParser) {
            throw new IllegalArgumentException(
                    "The underlying parser of a ForkParser should not be a ForkParser, but a specific implementation.");
        }
        this.tikaBin = null;
        this.parserFactoryFactory = null;
        this.loader = loader;
        this.parser = parser;
    }

    public ForkParser(ClassLoader loader) {
        this(loader, new AutoDetectParser());
    }

    public ForkParser() {
        this(ForkParser.class.getClassLoader());
    }

    /**
     * Returns the size of the process pool.
     *
     * @return process pool size
     */
    public synchronized int getPoolSize() {
        return poolSize;
    }

    /**
     * Returns the command used to start the forked server process.
     *
     * @return java command line
     * @deprecated since 1.8
     * @see ForkParser#getJavaCommandAsList()
     */
    @Deprecated
    public String getJavaCommand() {
        StringBuilder sb = new StringBuilder();
        for (String part : getJavaCommandAsList()) {
            sb.append(part).append(' ');
        }
        sb.deleteCharAt(sb.length() - 1);
        return sb.toString();
    }

    /**
     * Returns the command used to start the forked server process.
     * <p/>
     * Returned list is unmodifiable.
     * 
     * @return java command line args
     */
    public List<String> getJavaCommandAsList() {
        return Collections.unmodifiableList(java);
    }

    /**
     * Sets the command used to start the forked server process. The arguments
     * "-jar" and "/path/to/bootstrap.jar" or "-cp" and "/path/to/tika_bin" are
     * appended to the given command when starting the process. The default setting
     * is {"java", "-Xmx32m"}.
     * <p/>
     * Creates a defensive copy.
     * 
     * @param java
     *            java command line
     */
    public void setJavaCommand(List<String> java) {
        this.java = new ArrayList<>(java);
    }

    /**
     * Sets the command used to start the forked server process. The given command
     * line is split on whitespace and the arguments "-jar" and
     * "/path/to/bootstrap.jar" are appended to it when starting the process. The
     * default setting is "java -Xmx32m".
     *
     * @param java
     *            java command line
     * @deprecated since 1.8
     * @see ForkParser#setJavaCommand(List)
     */
    @Deprecated
    public void setJavaCommand(String java) {
        setJavaCommand(Arrays.asList(java.split(" ")));
    }

    public Set<MediaType> getSupportedTypes(ParseContext context) {
        return parser.getSupportedTypes(context);
    }

    /**
     *
     * This sends the objects to the server for parsing, and the server via the
     * proxies acts on the handler as if it were updating it directly.
     * <p>
     * If using a RecursiveParserWrapper, there are two options:
     * </p>
     * <p>
     * <ol>
     * <li>Send in a class that extends
     * {@link org.apache.tika.sax.RecursiveParserWrapperHandler}, and the server
     * will proxy back the data as best it can[0].</li>
     * <li>Send in a class that extends
     * {@link AbstractRecursiveParserWrapperHandler} and the server will act on the
     * class but not proxy back the data. This can be used, for example, if all you
     * want to do is write to disc, extend
     * {@link AbstractRecursiveParserWrapperHandler} to write to disc when
     * {@link AbstractRecursiveParserWrapperHandler#endDocument(ContentHandler, Metadata)}
     * is called, and the server will take care of the writing via the handler.</li>
     * </ol>
     * </p>
     * <p>
     * <b>NOTE:</b>[0] &quot;the server will proxy back the data as best it
     * can&quot;. If the handler implements Serializable and is actually
     * serializable, the server will send it and the {@link Metadata} back upon
     * {@link org.apache.tika.sax.RecursiveParserWrapperHandler#endEmbeddedDocument(ContentHandler, Metadata)}
     * or
     * {@link org.apache.tika.sax.RecursiveParserWrapperHandler#endEmbeddedDocument(ContentHandler, Metadata)}.
     * If the handler does not implement {@link java.io.Serializable} or if there is
     * a {@link java.io.NotSerializableException} thrown during serialization, the
     * server will call {@link ContentHandler#toString()} on the ContentHandler and
     * set that value with the
     * {@link org.apache.tika.sax.RecursiveParserWrapperHandler#TIKA_CONTENT} key
     * and then serialize and proxy that data back.
     * </p>
     *
     * @param stream
     *            the document stream (input)
     * @param handler
     *            handler for the XHTML SAX events (output)
     * @param metadata
     *            document metadata (input and output)
     * @param context
     *            parse context
     * @throws IOException
     * @throws SAXException
     * @throws TikaException
     */
    public void parse(InputStream stream, ContentHandler handler, Metadata metadata, ParseContext context)
            throws IOException, SAXException, TikaException {
        if (stream == null) {
            throw new NullPointerException("null stream");
        }

        Throwable t;

        boolean alive = false;
        ForkClient client = null;
        TemporaryResources tmp = new TemporaryResources();
        try {
            // must create temp file because of parsers that read directly from stream when
            // creating subitems (eg. rfc822parser)
            TikaInputStream tis = TikaInputStream.get(stream, tmp);
            tis.getFile();

            client = acquireClient();

            // this here mixtures metadata of main and embedded docs. Will use
            // TeeContentHandler later
            /*
             * ContentHandler tee = (handler instanceof
             * AbstractRecursiveParserWrapperHandler) ? handler : new
             * TeeContentHandler(handler, new MetadataContentHandler(metadata));
             */
            t = client.callInBackground("parse", tis, handler, metadata, context);
            alive = true;
        } catch (TikaException te) {
            // Problem occurred on our side
            alive = true;
            throw te;
        } catch (IOException e) {
            // Problem occurred on the other side
            throw new TikaException("Failed to communicate with a forked parser process."
                    + " The process has most likely crashed due to some error"
                    + " like running out of memory. A new process will be" + " started for the next parsing request.",
                    e);
        } catch (InterruptedException e) {
            if (client != null)
                client.close();
            throw new TikaException("ForkParser Interrupted, current ForkServer closed", e);

        } finally {
            if (client != null)
                releaseClient(client, alive);
            IOUtil.closeQuietly(tmp);
        }

        if (t instanceof IOException) {
            throw (IOException) t;
        } else if (t instanceof SAXException) {
            throw (SAXException) t;
        } else if (t instanceof TikaException) {
            throw (TikaException) t;
        } else if (t != null) {
            throw new TikaException("Unexpected error in forked server process", t);
        }
    }

    public synchronized void close() {
        for (ForkClient client : pool) {
            client.close();
        }
        pool.clear();
        poolSize = 0;
    }

    // patched to concurrently start new clients
    private ForkClient acquireClient() throws IOException, TikaException {
        while (true) {
            boolean startNew = false;
            ForkClient client;

            synchronized (this) {
                client = pool.poll();
                // Create a new process if there's room in the pool
                if (client == null && currentlyInUse < poolSize) {
                    startNew = true;
                }
                currentlyInUse++;
            }
            if (startNew) {
                try {
                    client = newClient();
                } catch (Throwable e) {
                    synchronized (this) {
                        currentlyInUse--;
                    }
                    throw e;
                }
            }
            // Ping the process, and get rid of it if it's inactive
            if (client != null && !client.ping()) {
                client.close();
                client = null;
            }
            synchronized (this) {
                if (client != null) {
                    return client;

                } else {
                    currentlyInUse--;
                    if (currentlyInUse >= poolSize) {
                        try {
                            wait(1000);
                        } catch (InterruptedException e) {
                            throw new TikaException("Interrupted while waiting for a fork parser", e);
                        }
                    }
                }
            }
        }
    }

    private ForkClient newClient() throws IOException, TikaException {
        TimeoutLimits timeoutLimits = new TimeoutLimits(serverPulseMillis, serverParseTimeoutMillis,
                serverWaitTimeoutMillis);
        if (loader == null && parser == null && tikaBin != null && parserFactoryFactory != null) {
            return new ForkClient(tikaBin, pluginDir, parserFactoryFactory, java, timeoutLimits);
        } else if (loader != null && parser != null && tikaBin == null && parserFactoryFactory == null) {
            return new ForkClient(loader, parser, java, timeoutLimits);
        } else if (loader != null && parser == null && tikaBin != null && parserFactoryFactory != null) {
            return new ForkClient(tikaBin, pluginDir, parserFactoryFactory, loader, java, timeoutLimits);
        } else {
            // TODO: make this more useful
            throw new IllegalStateException("Unexpected combination of state items");
        }
    }

    private synchronized void releaseClient(ForkClient client, boolean alive) {
        currentlyInUse--;
        if (currentlyInUse + pool.size() < poolSize && alive) {
            if (maxFilesProcessedPerClient > 0 && client.getFilesProcessed() >= maxFilesProcessedPerClient) {
                client.close();
            } else {
                pool.offer(client);
            }
            notifyAll();
        } else {
            client.close();
        }
    }

    /**
     * The amount of time in milliseconds that the server should wait before
     * checking to see if the parse has timed out or if the wait has timed out The
     * default is 5 seconds.
     *
     * @param serverPulseMillis
     *            milliseconds to sleep before checking if there has been any
     *            activity
     */
    public void setServerPulseMillis(long serverPulseMillis) {
        this.serverPulseMillis = serverPulseMillis;
    }

    /**
     * The maximum amount of time allowed for the server to try to parse a file. If
     * more than this time elapses, the server shuts down, and the ForkParser throws
     * an exception.
     *
     * @param serverParseTimeoutMillis
     */
    public void setServerParseTimeoutMillis(long serverParseTimeoutMillis) {
        this.serverParseTimeoutMillis = serverParseTimeoutMillis;
    }

    /**
     * The maximum amount of time allowed for the server to wait for a new request
     * to parse a file. The server will shutdown after this amount of time, and a
     * new server will have to be started by a new client.
     * 
     * @param serverWaitTimeoutMillis
     */
    public void setServerWaitTimeoutMillis(long serverWaitTimeoutMillis) {
        this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
    }

    /**
     * If there is a slowly building memory leak in one of the parsers, it is useful
     * to set a limit on the number of files processed by a server before it is
     * shutdown and restarted. Default value is -1.
     *
     * @param maxFilesProcessedPerClient
     *            maximum number of files that a server can handle before the parser
     *            shuts down a client and creates a new process. If set to -1, the
     *            server is never restarted because of the number of files handled.
     */
    public void setMaxFilesProcessedPerServer(int maxFilesProcessedPerClient) {
        this.maxFilesProcessedPerClient = maxFilesProcessedPerClient;
    }

}
